package com.sea.bei;


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Date  2021/11/8-23:53
 * 一：描述:
 *   测试Flink SQL 的方式运行 FlinkCDC
 *
 * 二：注意点:
 *
 *  1 使用SQL的方式运行FlinkCDC 需要flink的版本为 1.13.x
 *  2 经测试 'scan.startup.mode' = 'initial' 的方式只能读取表中现有的数据，
 *       但是数据发生变化时不能监控到（不知为什么）, 使用Java来写与scala也一样。
 *  3 经测试 'scan.startup.mode' = 'latest-offset' 的方式使用正常


 */
public class FlinkCDCJavaSQL {

    public static void main(String[] args) throws Exception {
//1.创建执行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.创建 Flink-MySQL-CDC 的 Source
        tableEnv.executeSql("create table user_info (" +
                "id string primary key," +
                "name STRING," +
                "sex STRING" +
                ")with(" +
                "'connector' = 'mysql-cdc'," +
                "'hostname' = 'localhost'," +
                "'port' = '3306'," +
                "'username' = 'root'," +
                "'password' = '123456'," +
                "'database-name' = 'flinkcdc'," +
                "'scan.startup.mode' = 'initial'," +
                "'table-name' = 'user_info'" +
                ")");

        Table table = tableEnv.sqlQuery("select * from user_info");
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);

        retractStream.print();

        env.execute();
    }

}
